Ask AI

You are viewing an unreleased or outdated version of the documentation

Delta Lake (dagster-deltalake)

This library provides an integration with the Delta Lake storage framework.

Related Guides:

dagster_deltalake.DeltaLakeIOManager IOManagerDefinition[source]

Config Schema:
root_uri (dagster.StringSource):

Storage location where Delta tables are stored.

mode (Union[WriteMode, None], optional):

The write mode passed to save the output.

Default Value: ‘overwrite’

overwrite_schema (Union[dagster.BoolSource, None], optional):

Default Value: False

writer_engine (Union[WriterEngine, None], optional):

Engine passed to write_deltalake.

Default Value: ‘pyarrow’

storage_options (selector):
Config Schema:
azure (strict dict):

Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store.

Config Schema:
account_name (dagster.StringSource):

client_id (Union[dagster.StringSource, None], optional):

Default Value: None

client_secret (Union[dagster.StringSource, None], optional):

Default Value: None

tenant_id (Union[dagster.StringSource, None], optional):

Default Value: None

federated_token_file (Union[dagster.StringSource, None], optional):

Default Value: None

account_key (Union[dagster.StringSource, None], optional):

Default Value: None

sas_key (Union[dagster.StringSource, None], optional):

Default Value: None

token (Union[dagster.StringSource, None], optional):

Default Value: None

use_azure_cli (Union[dagster.BoolSource, None], optional):

Default Value: None

use_fabric_endpoint (Union[dagster.BoolSource, None], optional):

Default Value: None

msi_resource_id (Union[dagster.StringSource, None], optional):

Default Value: None

msi_endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

container_name (Union[dagster.StringSource, None], optional):

Default Value: None

s3 (strict dict, optional):

Storage configuration for Amazon Web Services (AWS) S3 object store.

Default Value:
{
    "access_key_id": null,
    "secret_access_key": null,
    "region": null,
    "bucket": null,
    "endpoint": null,
    "token": null,
    "imdsv1_fallback": false,
    "virtual_hosted_style_request": null,
    "unsigned_payload": null,
    "checksum": null,
    "metadata_endpoint": null,
    "container_credentials_relative_uri": null,
    "copy_if_not_exists": null,
    "allow_unsafe_rename": null
}
Config Schema:
access_key_id (Union[dagster.StringSource, None], optional):

Default Value: None

secret_access_key (Union[dagster.StringSource, None], optional):

Default Value: None

region (Union[dagster.StringSource, None], optional):

Default Value: None

bucket (Union[dagster.StringSource, None], optional):

Default Value: None

endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

token (Union[dagster.StringSource, None], optional):

Default Value: None

imdsv1_fallback (Union[dagster.BoolSource, None], optional):

Default Value: False

virtual_hosted_style_request (Union[dagster.StringSource, None], optional):

Default Value: None

unsigned_payload (Union[dagster.BoolSource, None], optional):

Default Value: None

checksum (Union[dagster.StringSource, None], optional):

Default Value: None

metadata_endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

container_credentials_relative_uri (Union[dagster.StringSource, None], optional):

Default Value: None

copy_if_not_exists (Union[dagster.StringSource, None], optional):

Default Value: None

allow_unsafe_rename (Union[dagster.BoolSource, None], optional):

Default Value: None

local (strict dict, optional):

Storage configuration for local object store.

Default Value:
{}
gcs (strict dict, optional):

Storage configuration for Google Cloud Storage object store.

Default Value:
{
    "service_account": null,
    "service_account_key": null,
    "bucket": null,
    "application_credentials": null
}
Config Schema:
service_account (Union[dagster.StringSource, None], optional):

Default Value: None

service_account_key (Union[dagster.StringSource, None], optional):

Default Value: None

bucket (Union[dagster.StringSource, None], optional):

Default Value: None

application_credentials (Union[dagster.StringSource, None], optional):

Default Value: None

client_options (Union[strict dict, None], optional):

Additional configuration passed to http client.

Default Value: None

table_config (Union[dict, None], optional):

Additional config and metadata added to table on creation.

Default Value: None

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

Default Value: None

custom_metadata (Union[dict, None], optional):

Custom metadata that is added to transaction commit.

Default Value: None

writer_properties (Union[dict, None], optional):

Writer properties passed to the rust engine writer.

Default Value: None

Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake.

Examples

from dagster_deltalake import DeltaLakeIOManager
from dagster_deltalake_pandas import DeltaLakePandasTypeHandler

class MyDeltaLakeIOManager(DeltaLakeIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [DeltaLakePandasTypeHandler()]

@asset(
    key_prefix=["my_schema"]  # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": MyDeltaLakeIOManager()}
)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...
dagster_deltalake.DeltaLakePyarrowIOManager IOManagerDefinition[source]

Config Schema:
root_uri (dagster.StringSource):

Storage location where Delta tables are stored.

mode (Union[WriteMode, None], optional):

The write mode passed to save the output.

Default Value: ‘overwrite’

overwrite_schema (Union[dagster.BoolSource, None], optional):

Default Value: False

writer_engine (Union[WriterEngine, None], optional):

Engine passed to write_deltalake.

Default Value: ‘pyarrow’

storage_options (selector):
Config Schema:
azure (strict dict):

Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store.

Config Schema:
account_name (dagster.StringSource):

client_id (Union[dagster.StringSource, None], optional):

Default Value: None

client_secret (Union[dagster.StringSource, None], optional):

Default Value: None

tenant_id (Union[dagster.StringSource, None], optional):

Default Value: None

federated_token_file (Union[dagster.StringSource, None], optional):

Default Value: None

account_key (Union[dagster.StringSource, None], optional):

Default Value: None

sas_key (Union[dagster.StringSource, None], optional):

Default Value: None

token (Union[dagster.StringSource, None], optional):

Default Value: None

use_azure_cli (Union[dagster.BoolSource, None], optional):

Default Value: None

use_fabric_endpoint (Union[dagster.BoolSource, None], optional):

Default Value: None

msi_resource_id (Union[dagster.StringSource, None], optional):

Default Value: None

msi_endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

container_name (Union[dagster.StringSource, None], optional):

Default Value: None

s3 (strict dict, optional):

Storage configuration for Amazon Web Services (AWS) S3 object store.

Default Value:
{
    "access_key_id": null,
    "secret_access_key": null,
    "region": null,
    "bucket": null,
    "endpoint": null,
    "token": null,
    "imdsv1_fallback": false,
    "virtual_hosted_style_request": null,
    "unsigned_payload": null,
    "checksum": null,
    "metadata_endpoint": null,
    "container_credentials_relative_uri": null,
    "copy_if_not_exists": null,
    "allow_unsafe_rename": null
}
Config Schema:
access_key_id (Union[dagster.StringSource, None], optional):

Default Value: None

secret_access_key (Union[dagster.StringSource, None], optional):

Default Value: None

region (Union[dagster.StringSource, None], optional):

Default Value: None

bucket (Union[dagster.StringSource, None], optional):

Default Value: None

endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

token (Union[dagster.StringSource, None], optional):

Default Value: None

imdsv1_fallback (Union[dagster.BoolSource, None], optional):

Default Value: False

virtual_hosted_style_request (Union[dagster.StringSource, None], optional):

Default Value: None

unsigned_payload (Union[dagster.BoolSource, None], optional):

Default Value: None

checksum (Union[dagster.StringSource, None], optional):

Default Value: None

metadata_endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

container_credentials_relative_uri (Union[dagster.StringSource, None], optional):

Default Value: None

copy_if_not_exists (Union[dagster.StringSource, None], optional):

Default Value: None

allow_unsafe_rename (Union[dagster.BoolSource, None], optional):

Default Value: None

local (strict dict, optional):

Storage configuration for local object store.

Default Value:
{}
gcs (strict dict, optional):

Storage configuration for Google Cloud Storage object store.

Default Value:
{
    "service_account": null,
    "service_account_key": null,
    "bucket": null,
    "application_credentials": null
}
Config Schema:
service_account (Union[dagster.StringSource, None], optional):

Default Value: None

service_account_key (Union[dagster.StringSource, None], optional):

Default Value: None

bucket (Union[dagster.StringSource, None], optional):

Default Value: None

application_credentials (Union[dagster.StringSource, None], optional):

Default Value: None

client_options (Union[strict dict, None], optional):

Additional configuration passed to http client.

Default Value: None

table_config (Union[dict, None], optional):

Additional config and metadata added to table on creation.

Default Value: None

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

Default Value: None

custom_metadata (Union[dict, None], optional):

Custom metadata that is added to transaction commit.

Default Value: None

writer_properties (Union[dict, None], optional):

Writer properties passed to the rust engine writer.

Default Value: None

Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake.

Examples

from dagster_deltalake import DeltaLakeIOManager
from dagster_deltalake_pandas import DeltaLakePandasTypeHandler

class MyDeltaLakeIOManager(DeltaLakeIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [DeltaLakePandasTypeHandler()]

@asset(
    key_prefix=["my_schema"]  # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": MyDeltaLakeIOManager()}
)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...
dagster_deltalake.DeltaTableResource ResourceDefinition[source]

Config Schema:
url (dagster.StringSource):

storage_options (selector):
Config Schema:
azure (strict dict):

Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store.

Config Schema:
account_name (dagster.StringSource):

client_id (Union[dagster.StringSource, None], optional):

Default Value: None

client_secret (Union[dagster.StringSource, None], optional):

Default Value: None

tenant_id (Union[dagster.StringSource, None], optional):

Default Value: None

federated_token_file (Union[dagster.StringSource, None], optional):

Default Value: None

account_key (Union[dagster.StringSource, None], optional):

Default Value: None

sas_key (Union[dagster.StringSource, None], optional):

Default Value: None

token (Union[dagster.StringSource, None], optional):

Default Value: None

use_azure_cli (Union[dagster.BoolSource, None], optional):

Default Value: None

use_fabric_endpoint (Union[dagster.BoolSource, None], optional):

Default Value: None

msi_resource_id (Union[dagster.StringSource, None], optional):

Default Value: None

msi_endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

container_name (Union[dagster.StringSource, None], optional):

Default Value: None

s3 (strict dict, optional):

Storage configuration for Amazon Web Services (AWS) S3 object store.

Default Value:
{
    "access_key_id": null,
    "secret_access_key": null,
    "region": null,
    "bucket": null,
    "endpoint": null,
    "token": null,
    "imdsv1_fallback": false,
    "virtual_hosted_style_request": null,
    "unsigned_payload": null,
    "checksum": null,
    "metadata_endpoint": null,
    "container_credentials_relative_uri": null,
    "copy_if_not_exists": null,
    "allow_unsafe_rename": null
}
Config Schema:
access_key_id (Union[dagster.StringSource, None], optional):

Default Value: None

secret_access_key (Union[dagster.StringSource, None], optional):

Default Value: None

region (Union[dagster.StringSource, None], optional):

Default Value: None

bucket (Union[dagster.StringSource, None], optional):

Default Value: None

endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

token (Union[dagster.StringSource, None], optional):

Default Value: None

imdsv1_fallback (Union[dagster.BoolSource, None], optional):

Default Value: False

virtual_hosted_style_request (Union[dagster.StringSource, None], optional):

Default Value: None

unsigned_payload (Union[dagster.BoolSource, None], optional):

Default Value: None

checksum (Union[dagster.StringSource, None], optional):

Default Value: None

metadata_endpoint (Union[dagster.StringSource, None], optional):

Default Value: None

container_credentials_relative_uri (Union[dagster.StringSource, None], optional):

Default Value: None

copy_if_not_exists (Union[dagster.StringSource, None], optional):

Default Value: None

allow_unsafe_rename (Union[dagster.BoolSource, None], optional):

Default Value: None

local (strict dict, optional):

Storage configuration for local object store.

Default Value:
{}
gcs (strict dict, optional):

Storage configuration for Google Cloud Storage object store.

Default Value:
{
    "service_account": null,
    "service_account_key": null,
    "bucket": null,
    "application_credentials": null
}
Config Schema:
service_account (Union[dagster.StringSource, None], optional):

Default Value: None

service_account_key (Union[dagster.StringSource, None], optional):

Default Value: None

bucket (Union[dagster.StringSource, None], optional):

Default Value: None

application_credentials (Union[dagster.StringSource, None], optional):

Default Value: None

client_options (Union[strict dict, None], optional):

Additional configuration passed to http client.

Default Value: None

version (Union[dagster.IntSource, None], optional):

Version to load delta table.

Default Value: None

Resource for interacting with a Delta table.

Examples

from dagster import Definitions, asset
from dagster_deltalake import DeltaTableResource, LocalConfig

@asset
def my_table(delta_table: DeltaTableResource):
    df = delta_table.load().to_pandas()

defs = Definitions(
    assets=[my_table],
    resources={
        "delta_table": DeltaTableResource(
            url="/path/to/table",
            storage_options=LocalConfig()
        )
    }
)